#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import atexit
import functools
import json
import logging
import os
import sys
import traceback
import warnings
from importlib import metadata
from typing import TYPE_CHECKING, Any, Callable

import pluggy
from packaging.version import Version
from sqlalchemy import create_engine, exc, text
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

from airflow import __version__ as airflow_version, policies
from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf  # noqa: F401
from airflow.exceptions import AirflowInternalRuntimeError, RemovedInAirflow3Warning
from airflow.executors import executor_constants
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.sqlalchemy import is_sqlalchemy_v1
from airflow.utils.state import State
from airflow.utils.timezone import local_timezone, parse_timezone, utc

if TYPE_CHECKING:
    from sqlalchemy.engine import Engine
    from sqlalchemy.orm import Session as SASession

    from airflow.www.utils import UIAlert

log = logging.getLogger(__name__)

try:
    if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
        TIMEZONE = parse_timezone(tz)
    else:
        TIMEZONE = local_timezone()
except Exception:
    TIMEZONE = utc

log.info("Configured default timezone %s", TIMEZONE)


HEADER = "\n".join(
    [
        r"  ____________       _____________",
        r" ____    |__( )_________  __/__  /________      __",
        r"____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /",
        r"___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /",
        r" _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/",
    ]
)

LOGGING_LEVEL = logging.INFO

# the prefix to append to gunicorn worker processes after init
GUNICORN_WORKER_READY_PREFIX = "[ready] "

LOG_FORMAT = conf.get("logging", "log_format")
SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")

SQL_ALCHEMY_CONN: str | None = None
PLUGINS_FOLDER: str | None = None
LOGGING_CLASS_PATH: str | None = None
DONOT_MODIFY_HANDLERS: bool | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))

engine: Engine
Session: Callable[..., SASession]

# The JSON library to use for DAG Serialization and De-Serialization
json = json

# Dictionary containing State and colors associated to each state to
# display on the Webserver
STATE_COLORS = {
    "deferred": "mediumpurple",
    "failed": "red",
    "queued": "gray",
    "removed": "lightgrey",
    "restarting": "violet",
    "running": "lime",
    "scheduled": "tan",
    "skipped": "hotpink",
    "success": "green",
    "up_for_reschedule": "turquoise",
    "up_for_retry": "gold",
    "upstream_failed": "orange",
    "shutdown": "blue",
}


@functools.lru_cache(maxsize=None)
def _get_rich_console(file):
    # Delay imports until we need it
    import rich.console

    return rich.console.Console(file=file)


def custom_show_warning(message, category, filename, lineno, file=None, line=None):
    """Print rich and visible warnings."""
    # Delay imports until we need it
    from rich.markup import escape

    msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
    msg += f" {category.__name__}[/bold]: {escape(str(message))}[/yellow]"
    write_console = _get_rich_console(file or sys.stderr)
    write_console.print(msg, soft_wrap=True)


def replace_showwarning(replacement):
    """Replace ``warnings.showwarning``, returning the original.

    This is useful since we want to "reset" the ``showwarning`` hook on exit to
    avoid lazy-loading issues. If a warning is emitted after Python cleaned up
    the import system, we would no longer be able to import ``rich``.
    """
    original = warnings.showwarning
    warnings.showwarning = replacement
    return original


original_show_warning = replace_showwarning(custom_show_warning)
atexit.register(functools.partial(replace_showwarning, original_show_warning))

POLICY_PLUGIN_MANAGER: Any = None  # type: ignore


def task_policy(task):
    return POLICY_PLUGIN_MANAGER.hook.task_policy(task=task)


def dag_policy(dag):
    return POLICY_PLUGIN_MANAGER.hook.dag_policy(dag=dag)


def task_instance_mutation_hook(task_instance):
    return POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook(task_instance=task_instance)


task_instance_mutation_hook.is_noop = True  # type: ignore


def pod_mutation_hook(pod):
    return POLICY_PLUGIN_MANAGER.hook.pod_mutation_hook(pod=pod)


def get_airflow_context_vars(context):
    return POLICY_PLUGIN_MANAGER.hook.get_airflow_context_vars(context=context)


def get_dagbag_import_timeout(dag_file_path: str):
    return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)


def configure_policy_plugin_manager():
    global POLICY_PLUGIN_MANAGER

    POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
    POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
    POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)


def load_policy_plugins(pm: pluggy.PluginManager):
    # We can't log duration etc  here, as logging hasn't yet been configured!
    pm.load_setuptools_entrypoints("airflow.policy")


def configure_vars():
    """Configure Global Variables from airflow.cfg."""
    global SQL_ALCHEMY_CONN
    global DAGS_FOLDER
    global PLUGINS_FOLDER
    global DONOT_MODIFY_HANDLERS
    SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")

    DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))

    PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))

    # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
    # If the flag is set to False, we remove all handlers from the root logger
    # and add all handlers from 'airflow.task' logger to the root Logger. This is done
    # to get all the logs from the print & log statements in the DAG files before a task is run
    # The handlers are restored after the task completes execution.
    DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False)


def _run_openlineage_runtime_check():
    """
    Ensure compatibility of OpenLineage provider package and Airflow version.

    Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage
    provider incompatible with future Airflow versions (>= 2.10.0).
    """
    ol_package = "apache-airflow-providers-openlineage"
    try:
        ol_version = metadata.version(ol_package)
    except metadata.PackageNotFoundError:
        return

    if ol_version and Version(ol_version) < Version("1.8.0.dev0"):
        raise RuntimeError(
            f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with "
            f"`apache-airflow` == `{airflow_version}`. "
            f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`."
        )


def run_providers_custom_runtime_checks():
    _run_openlineage_runtime_check()


class SkipDBTestsSession:
    """
    This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set.

    :meta private:
    """

    def __init__(self):
        raise AirflowInternalRuntimeError(
            "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
            "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n"
            "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
            "best-practices-for-db-tests on how "
            "to deal with it and consult examples."
        )

    def remove(*args, **kwargs):
        pass


class TracebackSession:
    """
    Session that throws error when you try to use it.

    Also stores stack at instantiation call site.

    :meta private:
    """

    def __init__(self):
        self.traceback = traceback.extract_stack()

    def __getattr__(self, item):
        raise RuntimeError(
            "TracebackSession object was used but internal API is enabled. "
            "You'll need to ensure you are making only RPC calls with this object. "
            "The stack list below will show where the TracebackSession object was created."
            + "\n".join(traceback.format_list(self.traceback))
        )

    def remove(*args, **kwargs):
        pass


def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
    """Determine whether the database connection URI specifies a relative path."""
    # Check for non-empty connection string:
    if not sqla_conn_str:
        return False
    # Check for the right URI scheme:
    if not sqla_conn_str.startswith("sqlite"):
        return False
    # In-memory is not useful for production, but useful for writing tests against Airflow for extensions
    if sqla_conn_str == "sqlite://":
        return False
    # Check for absolute path:
    if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs(
        sqla_conn_str[len(abs_prefix) :]
    ):
        return False
    return True


def configure_orm(disable_connection_pool=False, pool_class=None):
    """Configure ORM using SQLAlchemy."""
    from airflow.utils.log.secrets_masker import mask_secret

    if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
        from airflow.exceptions import AirflowConfigException

        raise AirflowConfigException(
            f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
            "Please use absolute path such as `sqlite:////tmp/airflow.db`."
        )

    global Session
    global engine
    from airflow.api_internal.internal_api_call import InternalApiConfig

    if InternalApiConfig.get_use_internal_api():
        Session = TracebackSession
        engine = None
        return
    elif os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
        # Skip DB initialization in unit tests, if DB tests are skipped
        Session = SkipDBTestsSession
        engine = None
        return
    log.debug("Setting up DB connection pool (PID %s)", os.getpid())
    engine_args = prepare_engine_args(disable_connection_pool, pool_class)

    if conf.has_option("database", "sql_alchemy_connect_args"):
        connect_args = conf.getimport("database", "sql_alchemy_connect_args")
    else:
        connect_args = {}

    engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)

    mask_secret(engine.url.password)

    setup_event_handlers(engine)

    Session = scoped_session(
        sessionmaker(
            autocommit=False,
            autoflush=False,
            bind=engine,
            expire_on_commit=False,
        )
    )


DEFAULT_ENGINE_ARGS = {
    "postgresql": {
        "executemany_mode": "values_plus_batch",
        "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
        "executemany_batch_page_size": 2000,
    },
}


def prepare_engine_args(disable_connection_pool=False, pool_class=None):
    """Prepare SQLAlchemy engine args."""
    default_args = {}
    for dialect, default in DEFAULT_ENGINE_ARGS.items():
        if SQL_ALCHEMY_CONN.startswith(dialect):
            default_args = default.copy()
            break

    engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args)  # type: ignore

    if pool_class:
        # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
        engine_args["poolclass"] = pool_class
    elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
        engine_args["poolclass"] = NullPool
        log.debug("settings.prepare_engine_args(): Using NullPool")
    elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
        # Pool size engine args not supported by sqlite.
        # If no config value is defined for the pool size, select a reasonable value.
        # 0 means no limit, which could lead to exceeding the Database connection limit.
        pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)

        # The maximum overflow size of the pool.
        # When the number of checked-out connections reaches the size set in pool_size,
        # additional connections will be returned up to this limit.
        # When those additional connections are returned to the pool, they are disconnected and discarded.
        # It follows then that the total number of simultaneous connections
        # the pool will allow is pool_size + max_overflow,
        # and the total number of "sleeping" connections the pool will allow is pool_size.
        # max_overflow can be set to -1 to indicate no overflow limit;
        # no limit will be placed on the total number
        # of concurrent connections. Defaults to 10.
        max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)

        # The DB server already has a value for wait_timeout (number of seconds after
        # which an idle sleeping connection should be killed). Since other DBs may
        # co-exist on the same server, SQLAlchemy should set its
        # pool_recycle to an equal or smaller value.
        pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)

        # Check connection at the start of each connection pool checkout.
        # Typically, this is a simple statement like "SELECT 1", but may also make use
        # of some DBAPI-specific method to test the connection for liveness.
        # More information here:
        # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
        pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)

        log.debug(
            "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
            "pool_recycle=%d, pid=%d",
            pool_size,
            max_overflow,
            pool_recycle,
            os.getpid(),
        )
        engine_args["pool_size"] = pool_size
        engine_args["pool_recycle"] = pool_recycle
        engine_args["pool_pre_ping"] = pool_pre_ping
        engine_args["max_overflow"] = max_overflow

    # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
    # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
    # 'READ COMMITTED' is the default value for PostgreSQL.
    # More information here:
    # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"

    if SQL_ALCHEMY_CONN.startswith("mysql"):
        engine_args["isolation_level"] = "READ COMMITTED"

    if is_sqlalchemy_v1():
        # Allow the user to specify an encoding for their DB otherwise default
        # to utf-8 so jobs & users with non-latin1 characters can still use us.
        # This parameter was removed in SQLAlchemy 2.x.
        engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")

    return engine_args


def dispose_orm():
    """Properly close pooled database connections."""
    log.debug("Disposing DB connection pool (PID %s)", os.getpid())
    global engine
    global Session

    if Session is not None:  # type: ignore[truthy-function]
        Session.remove()
        Session = None
    if engine:
        engine.dispose()
        engine = None


def reconfigure_orm(disable_connection_pool=False, pool_class=None):
    """Properly close database connections and re-configure ORM."""
    dispose_orm()
    configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)


def configure_adapters():
    """Register Adapters and DB Converters."""
    from pendulum import DateTime as Pendulum

    if SQL_ALCHEMY_CONN.startswith("sqlite"):
        from sqlite3 import register_adapter

        register_adapter(Pendulum, lambda val: val.isoformat(" "))

    if SQL_ALCHEMY_CONN.startswith("mysql"):
        try:
            import MySQLdb.converters

            MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
        except ImportError:
            pass
        try:
            import pymysql.converters

            pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
        except ImportError:
            pass


def validate_session():
    """Validate ORM Session."""
    global engine

    worker_precheck = conf.getboolean("celery", "worker_precheck")
    if not worker_precheck:
        return True
    else:
        check_session = sessionmaker(bind=engine)
        session = check_session()
        try:
            session.execute(text("select 1"))
            conn_status = True
        except exc.DBAPIError as err:
            log.error(err)
            conn_status = False
        session.close()
        return conn_status


def configure_action_logging() -> None:
    """Any additional configuration (register callback) for airflow.utils.action_loggers module."""


def prepare_syspath():
    """Ensure certain subfolders of AIRFLOW_HOME are on the classpath."""
    if DAGS_FOLDER not in sys.path:
        sys.path.append(DAGS_FOLDER)

    # Add ./config/ for loading custom log parsers etc, or
    # airflow_local_settings etc.
    config_path = os.path.join(AIRFLOW_HOME, "config")
    if config_path not in sys.path:
        sys.path.append(config_path)

    if PLUGINS_FOLDER not in sys.path:
        sys.path.append(PLUGINS_FOLDER)


def get_session_lifetime_config():
    """Get session timeout configs and handle outdated configs gracefully."""
    session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None)
    session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None)
    uses_deprecated_lifetime_configs = session_lifetime_days or conf.get(
        "webserver", "force_log_out_after", fallback=None
    )

    minutes_per_day = 24 * 60
    default_lifetime_minutes = "43200"
    if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes:
        warnings.warn(
            "`session_lifetime_days` option from `[webserver]` section has been "
            "renamed to `session_lifetime_minutes`. The new option allows to configure "
            "session lifetime in minutes. The `force_log_out_after` option has been removed "
            "from `[webserver]` section. Please update your configuration.",
            category=RemovedInAirflow3Warning,
            stacklevel=2,
        )
        if session_lifetime_days:
            session_lifetime_minutes = minutes_per_day * int(session_lifetime_days)

    if not session_lifetime_minutes:
        session_lifetime_days = 30
        session_lifetime_minutes = minutes_per_day * session_lifetime_days

    log.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)

    return int(session_lifetime_minutes)


def import_local_settings():
    """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
    try:
        import airflow_local_settings
    except ModuleNotFoundError as e:
        if e.name == "airflow_local_settings":
            log.debug("No airflow_local_settings to import.", exc_info=True)
        else:
            log.critical(
                "Failed to import airflow_local_settings due to a transitive module not found error.",
                exc_info=True,
            )
            raise
    except ImportError:
        log.critical("Failed to import airflow_local_settings.", exc_info=True)
        raise
    else:
        if hasattr(airflow_local_settings, "__all__"):
            names = set(airflow_local_settings.__all__)
        else:
            names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")}

        if "policy" in names and "task_policy" not in names:
            warnings.warn(
                "Using `policy` in airflow_local_settings.py is deprecated. "
                "Please rename your `policy` to `task_policy`.",
                RemovedInAirflow3Warning,
                stacklevel=2,
            )
            setattr(airflow_local_settings, "task_policy", airflow_local_settings.policy)
            names.remove("policy")

        plugin_functions = policies.make_plugin_from_local_settings(
            POLICY_PLUGIN_MANAGER, airflow_local_settings, names
        )

        # If we have already handled a function by adding it to the plugin,
        # then don't clobber the global function
        for name in names - plugin_functions:
            globals()[name] = getattr(airflow_local_settings, name)

        if POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook.get_hookimpls():
            task_instance_mutation_hook.is_noop = False

        log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)


def initialize():
    """Initialize Airflow with all the settings from this file."""
    configure_vars()
    prepare_syspath()
    configure_policy_plugin_manager()
    # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
    # in airflow_local_settings to take precendec
    load_policy_plugins(POLICY_PLUGIN_MANAGER)
    import_local_settings()
    global LOGGING_CLASS_PATH
    LOGGING_CLASS_PATH = configure_logging()
    State.state_color.update(STATE_COLORS)

    configure_adapters()
    # The webservers import this file from models.py with the default settings.
    configure_orm()
    configure_action_logging()

    # Run any custom runtime checks that needs to be executed for providers
    run_providers_custom_runtime_checks()

    # Ensure we close DB connections at scheduler and gunicorn worker terminations
    atexit.register(dispose_orm)


def is_usage_data_collection_enabled() -> bool:
    """Check if data collection is enabled."""
    return conf.getboolean("usage_data_collection", "enabled", fallback=True) and (
        os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false"
    )


# Const stuff

KILOBYTE = 1024
MEGABYTE = KILOBYTE * KILOBYTE
WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}


# Updating serialized DAG can not be faster than a minimum interval to reduce database
# write rate.
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)

# If set to True, serialized DAGs is compressed before writing to DB,
COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)

# Fetching serialized DAG can not be faster than a minimum interval to reduce database
# read rate. This config controls when your DAGs are updated in the Webserver
MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10)

CAN_FORK = hasattr(os, "fork")

EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
    "core",
    "execute_tasks_new_python_interpreter",
    fallback=False,
)

ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False)

# Whether or not to check each dagrun against defined SLAs
CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True)

USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)

# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)

# By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
# Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
# loaded from module.
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)

# Determines if the executor utilizes Kubernetes
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
    executor_constants.KUBERNETES_EXECUTOR,
    executor_constants.CELERY_KUBERNETES_EXECUTOR,
    executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}

# Executors can set this to true to configure logging correctly for
# containerized executors.
IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
"""Will be True if running in kubernetes executor pod."""

HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")

# By default this is off, but is automatically configured on when running task
# instances
MASK_SECRETS_IN_LOGS = False

# Display alerts on the dashboard
# Useful for warning about setup issues or announcing changes to end users
# List of UIAlerts, which allows for specifying the message, category, and roles the
# message should be shown to. For example:
#   from airflow.www.utils import UIAlert
#
#   DASHBOARD_UIALERTS = [
#       UIAlert("Welcome to Airflow"),  # All users
#       UIAlert("Airflow update happening next week", roles=["User"]),  # Only users with the User role
#       # A flash message with html:
#       UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
#   ]
#
DASHBOARD_UIALERTS: list[UIAlert] = []

# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")

# AIP-44: internal_api (experimental)
# This feature is not complete yet, so we disable it by default.
_ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {
    "true",
    "t",
    "yes",
    "y",
    "1",
}
